package org.zjvis.datascience.common.etl;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Joiner;
import org.apache.commons.lang3.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zjvis.datascience.common.constant.SqlTemplate;
import org.zjvis.datascience.common.enums.ETLEnum;
import org.zjvis.datascience.common.enums.SubTypeEnum;
import org.zjvis.datascience.common.exception.UnionStatementParserException;
import org.zjvis.datascience.common.sql.SqlHelper;
import org.zjvis.datascience.common.util.ToolUtil;
import org.zjvis.datascience.common.vo.TaskVO;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
 * @description ETL-UNION 多表联结类
 * @date 2021-12-27
 */
public class Union extends BaseETL {

    private final static Logger logger = LoggerFactory.getLogger("union");

    private static final double MIN_REC_SCORE = 1.0;

    //1=union; 2=union all
    private int mod;

    private JSONArray rules = new JSONArray();

    private List<String> leftFields;

    private List<String> rightFields;

    public Union() {
        super(ETLEnum.UNION.name(), SubTypeEnum.ETL_OPERATE.getVal(),
                SubTypeEnum.ETL_OPERATE.getDesc());
        this.maxParentNumber = 2;
    }

    public static double getMinRecScore() {
        return MIN_REC_SCORE;
    }

    public void parserConf(JSONObject conf) {
        this.mod = conf.getIntValue("mod");
        leftFields = conf.getJSONArray("left").toJavaList(String.class);
        rightFields = conf.getJSONArray("right").toJavaList(String.class);
        if (leftFields.size() != rightFields.size()) {
            //Error: each UNION query must have the same number of columns
            logger.error("leftField's number is not equal with rightField's name");
        }
        if (conf.containsKey("autoUnionRecommendation") &&
                (!conf.containsKey("conditions") || conf.getJSONArray("conditions").size() == 0)
        ) {
            // 已经触发了推荐并且conditions为空或者不存在, 根据推荐结果进行初始化
            JSONArray recommendRules = conf.getJSONArray("autoUnionRecommendation");
            for (int i = 0; i < recommendRules.size(); i++) {
                JSONObject rule = recommendRules.getJSONObject(i);
                double score = rule.getDouble("score");
                if (score >= MIN_REC_SCORE) {
                    rule.remove("operator");
                    rules.add(rule);
                }
            }
            conf.put("conditions", SerializationUtils.clone(rules));
        } else if (conf.containsKey("conditions") && conf.getJSONArray("conditions").size() > 0) {
            // 用户配置union规则情况
            rules = conf.getJSONArray("conditions");
        }
    }

    private boolean isTypeCompatible(int left, int right) {
        return true;
    }

    private boolean validate(SqlHelper sqlHelper1, SqlHelper sqlHelper2) {
        if (leftFields.size() != rightFields.size()) {
            logger.error("leftFields's number is not equal with rightField's number");
            return false;
        }
        boolean flag = true;
        List<String> leftColumns = SqlTemplate.getColumns(leftFields);
        List<String> rightColumns = SqlTemplate.getColumns(rightFields);
        for (int index = 0; index < leftColumns.size(); ++index) {
            String leftKey = leftColumns.get(index);
            String rightKey = rightColumns.get(index);
            if (sqlHelper1.getColumnType(leftKey) == null
                    || sqlHelper2.getColumnType(rightKey) == null) {
                return false;
            }
            if (!(sqlHelper1.getColumnType(leftKey).equals(sqlHelper2.getColumnType(rightKey)))) {
                logger.error("column type is not equal, please check");
                flag = false;
                break;
            }
        }
        return flag;
    }

    public String initSql(JSONObject conf, List<SqlHelper> sqlHelpers, long timeStamp,
                          String engineName) {
        this.engineName = engineName;
        if (sqlHelpers.size() != 2) {
            logger.error("sqlHelpers size is not 2");
            return null;
        }
        JSONArray input = conf.getJSONArray("input");
        if (input.size() != 2) {
            logger.error("input size is not equal with 2!!!!");
            return null;
        }
        String leftTable = input.getJSONObject(0).getString("tableName");
        leftTable = ToolUtil.alignTableName(leftTable, timeStamp);
        String rightTable = input.getJSONObject(1).getString("tableName");
        rightTable = ToolUtil.alignTableName(rightTable, timeStamp);
        eliminateRowId(input);
        this.rightFields = SqlTemplate.columnsRenameSql(rightFields, "b");
        String leftSql = "SELECT " + Joiner.on(", ").join(leftFields) + " FROM %s a";
        String rightSql = "SELECT " + Joiner.on(", ").join(rightFields) + " FROM %s b";

        JSONArray output = conf.getJSONArray("output");
        if (output == null || output.size() == 0) {
            logger.error("output is empty!!!!");
            return null;
        }

        String outputTable = output.getJSONObject(0).getString("tableName");
        outputTable = ToolUtil.alignTableName(outputTable, timeStamp);
        String selectSql = "";
        if (mod == 1) {
            selectSql =
                    String.format(leftSql, leftTable) + " UNION " + String.format(rightSql, rightTable);
        } else if (mod == 2) {
            selectSql = String.format(leftSql, leftTable) + " UNION ALL " + String
                    .format(rightSql, rightTable);
        }
        if (!selectSql.isEmpty()) {
            selectSql = "SELECT row_number() over() as _record_id_, * from (" + selectSql + ") c";
            return String.format(SqlTemplate.CREATE_VIEW_SQL, outputTable, selectSql);
        }
        return null;
    }

    private void eliminateRowId(JSONArray input) {
        List<String> leftCols = input.getJSONObject(0).getJSONArray("tableCols")
                .toJavaList(String.class);
        List<String> rightCols = input.getJSONObject(1).getJSONArray("tableCols")
                .toJavaList(String.class);
        if (leftCols.contains(ToolUtil.ID_COL)) {
            int index = leftCols.indexOf(ToolUtil.ID_COL);
            leftCols.remove(index);
            input.getJSONObject(0).put("tableCols", leftCols);
            List<String> leftTypes = input.getJSONObject(0).getJSONArray("columnTypes")
                    .toJavaList(String.class);
            leftTypes.remove(index);
            input.getJSONObject(0).put("columnTypes", leftTypes);
        }
        if (rightCols.contains(ToolUtil.ID_COL)) {
            int index = rightCols.indexOf(ToolUtil.ID_COL);
            rightCols.remove(index);
            input.getJSONObject(1).put("tableCols", rightCols);
            List<String> rightTypes = input.getJSONObject(1).getJSONArray("columnTypes")
                    .toJavaList(String.class);
            rightTypes.remove(index);
            input.getJSONObject(1).put("columnTypes", rightTypes);
        }
    }

    public void defineOutput(TaskVO vo) {
        JSONObject jsonObject = vo.getData();
        JSONArray output = new JSONArray();
        String tableName = String
                .format(SqlTemplate.VIEW_TABLE_NAME, vo.getPipelineId(), vo.getId());
        if (jsonObject.getJSONArray("left") == null || jsonObject.getJSONArray("right") == null) {
            return;
        }
        List<String> leftFields = jsonObject.getJSONArray("left").toJavaList(String.class);
        List<String> rightFields = jsonObject.getJSONArray("right").toJavaList(String.class);
        this.leftFields = leftFields;
        this.rightFields = rightFields;
        JSONArray outputColumnTypes = new JSONArray();
        JSONArray input = jsonObject.getJSONArray("input");
        if (input != null && input.size() >= 2) {
            List<SqlHelper> sqlHelpers = new ArrayList<>();
            ToolUtil.getSqlHelpers(input, sqlHelpers);
            if (!validate(sqlHelpers.get(0), sqlHelpers.get(1))) {
                vo.setException(new UnionStatementParserException("连接条件中存在字段类型不匹配!"));
                return;
            }
        }
        if (input != null && input.size() >= 2) {
            List<String> colNames = input.getJSONObject(0).getJSONArray("tableCols")
                    .toJavaList(String.class);
            List<String> colTypes = input.getJSONObject(0).getJSONArray("columnTypes")
                    .toJavaList(String.class);
            this.prepareOutputColumnTypes(outputColumnTypes, leftFields, colNames, colTypes);
        }

        JSONObject numberFormat = input.getJSONObject(0).getJSONObject("numberFormat");
        if (numberFormat != null && leftFields != null && !leftFields.isEmpty()) {
            Set<String> cols = numberFormat.keySet();
            for (String col : cols) {
                if (!leftFields.contains(col)) {
                    numberFormat.remove(col);
                }
            }
        }
        JSONObject item = new JSONObject();
        item.put("numberFormat", numberFormat);
        item.put("tableName", tableName);
        item.put("tableCols", leftFields);
        item.put("nodeName", vo.getName() == null ? ETLEnum.UNION.toString() : vo.getName());
        item.put("columnTypes", outputColumnTypes);
        item.put("semantic", new JSONObject());
        this.setSubTypeForOutput(item);
        output.add(item);
        jsonObject.put("output", output);
        vo.setData(jsonObject);
    }

    public void initTemplate(JSONObject data) {
        data.put("left", new JSONArray());
        data.put("right", new JSONArray());
        data.put("mod", 1);
        data.put("conditions", rules);
        baseInitTemplate(data);
    }

}
